package com.my;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;


public class App {
        public static void main(String[] args) throws Exception {
            ArrayList<String> list = new ArrayList<>();
            list.add("hello world");
            list.add("hello java");
            list.add("hello flink");
            // 创建 streaming execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            // 获取数据
//            DataStreamSource<String> stream = env.fromCollection(list);
            DataStreamSource<String> stream = env.socketTextStream("hadoop102",7777);
            stream.print(">>>");
            // 计数
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                    .keyBy(0)
                    .sum(1);

            sum.addSink(new SinkFunction<Tuple2<String, Integer>>() {
                @Override
                public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
                    System.out.println("单词"+"'"+value.f0+"'"+"出现了"+value.f1+"次");
                }
            });

            env.execute("WordCount");
        }

        public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
                String[] tokens = s.toLowerCase().split("\\W+");

                for (String token: tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }

}
